Support drop_table, partitions and offsets methods in python bindings#150
Support drop_table, partitions and offsets methods in python bindings#150luoyuxia merged 4 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds two administrative operations to the Python bindings to achieve feature parity with C++ bindings: drop_table() for deleting tables and list_offsets() for querying bucket offsets. Both methods follow existing async patterns and leverage core APIs that already exist in the Rust codebase.
Changes:
- Added
Admin.drop_table()method with optionalignore_if_not_existsparameter - Added
Admin.list_offsets()method supporting earliest, latest, and timestamp-based offset queries - Introduced
OffsetTypeclass with string constants for type-safe offset type specification - Updated example.py to demonstrate both new features
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| bindings/python/src/lib.rs | Adds OffsetType class definition with string constants and registers it in the Python module |
| bindings/python/src/admin.rs | Implements drop_table() and list_offsets() admin methods with validation and error handling |
| bindings/python/example/example.py | Demonstrates usage of new methods with both string literals and OffsetType constants |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
359fbf3 to
753c060
Compare
753c060 to
ffd6c8d
Compare
|
resolved conflict |
ffd6c8d to
0b4b9c0
Compare
|
@luoyuxia PTAL 🙏 |
0b4b9c0 to
e6f0034
Compare
|
I'll add partition support, since I see we added this to cpp bindings |
e6f0034 to
32e6418
Compare
|
@luoyuxia I've reworked this PR to support partitioned tables and have the same methods as CPP bindings have atm. |
|
I think #246 should be merged first and then I'll rebase one more time. |
|
@fresh-borzoni Sorry for miss it. I'll review this weekend. |
|
@fresh-borzoni Could you please rebase it? |
32e6418 to
2931a6e
Compare
|
@luoyuxia rebased and renamed |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let cache = self.partition_name_cache.read().unwrap(); | ||
| if let Some(map) = cache.as_ref() { | ||
| return Ok(map.clone()); | ||
| } | ||
| } | ||
|
|
||
| // Fetch partition infos (releases GIL during async call) | ||
| let partition_infos: Vec<fcore::metadata::PartitionInfo> = py | ||
| .detach(|| { | ||
| TOKIO_RUNTIME.block_on(async { self.admin.list_partition_infos(table_path).await }) | ||
| }) | ||
| .map_err(|e| FlussError::new_err(format!("Failed to list partition infos: {e}")))?; | ||
|
|
||
| // Build and cache the mapping | ||
| let map: HashMap<i64, String> = partition_infos | ||
| .into_iter() | ||
| .map(|info| (info.get_partition_id(), info.get_partition_name())) | ||
| .collect(); | ||
|
|
||
| // Store in cache (write lock) | ||
| { | ||
| let mut cache = self.partition_name_cache.write().unwrap(); |
There was a problem hiding this comment.
The unwrap() calls on RwLock::read() and RwLock::write() will panic if the lock is poisoned (which happens when a thread panics while holding the lock). While poison is rare in Python bindings due to controlled execution, it's better to handle this gracefully. Consider using expect() with a descriptive message or mapping to a PyErr using map_err() instead of unwrap().
luoyuxia
left a comment
There was a problem hiding this comment.
@fresh-borzoni Thanks for the pr. LGTM
Summary
Adds admin operations and partition support to achieve feature parity with Rust/C++ bindings.
Admin Operations
drop_table(table_path, ignore_if_not_exists=False)- Drop a table from the clusterlist_offsets(table_path, bucket_ids, offset_type, timestamp=None)- List offsets for non-partitioned tableslist_partition_offsets(table_path, partition_name, bucket_ids, offset_type, timestamp=None)- List offsets for partitioned tablescreate_partition(table_path, partition_spec, ignore_if_exists=False)- Create a partitionlist_partition_infos(table_path)- List all partitions for a tableLogScanner Low-Level API
Replaces high-level subscribe with low-level methods matching Rust/C++:
subscribe(bucket_id, start_offset)- Subscribe to a single bucketsubscribe_batch(bucket_offsets)- Subscribe to multiple bucketssubscribe_partition(partition_id, bucket_id, start_offset)- Subscribe to partitioned table bucketto_arrow()/to_pandas()now work for both partitioned and non-partitioned tablesCloses #148 #244